package com.boot.study.sink

import com.boot.study.api.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._


object FileSinkTest {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 读取文件
    val inputSteam: DataStream[String] = env.readTextFile("D:\\WorkSpace\\idea\\Flink\\src\\main\\resources\\sensor.txt")
    val dataStream: DataStream[SensorReading] = inputSteam.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    dataStream.print("file sink")
    //    dataStream.writeAsCsv("D:\\WorkSpace\\idea\\Flink\\src\\main\\resources\\out.txt")
    dataStream.addSink(
      StreamingFileSink.forRowFormat(
        new Path("D:\\WorkSpace\\idea\\Flink\\src\\main\\resources\\out"),
        new SimpleStringEncoder[SensorReading]()
      ).build())
    // 执行
    env.execute("file sink test")
  }
}
